Потоки — полное руководство

Узнайте, как использовать доступные для чтения, записи и преобразования потоки с помощью API потоков.

API потоков позволяет вам программно получать доступ к потокам данных, полученным по сети или созданным любыми средствами локально, и обрабатывать их с помощью JavaScript. Потоковая передача подразумевает разбиение ресурса, который вы хотите получить, отправить или преобразовать, на небольшие фрагменты, а затем пошаговую обработку этих фрагментов. Хотя потоковая передача — это то, что браузеры делают в любом случае при получении ресурсов, таких как HTML или видео, для показа на веб-страницах, эта возможность никогда не была доступна JavaScript до появления функции fetch with streams в 2015 году.

Раньше, если вы хотели обработать какой-либо ресурс (будь то видео, текстовый файл и т. д.), вам приходилось загружать весь файл, ждать, пока он будет десериализован в подходящий формат, а затем обрабатывать его. С появлением потоков в JavaScript все это меняется. Теперь вы можете обрабатывать необработанные данные с помощью JavaScript постепенно, как только они становятся доступны на клиенте, без необходимости генерировать буфер, строку или blob. Это открывает ряд вариантов использования, некоторые из которых я перечисляю ниже:

  • Видеоэффекты: передача читаемого видеопотока через поток преобразования, который применяет эффекты в реальном времени.
  • (Де)компрессия данных: передача потока файлов через поток преобразования, который выборочно (де)компрессирует его.
  • Декодирование изображений: передача потока ответа HTTP через поток преобразования, который декодирует байты в данные битовой карты, а затем через другой поток преобразования, который переводит битовые карты в PNG. Если установлено внутри обработчика fetch сервисного работника, это позволяет вам прозрачно заполнять новые форматы изображений, такие как AVIF.

Поддержка браузера

ReadableStream и WritableStream

Browser Support

  • Хром: 43.
  • Край: 14.
  • Firefox: 65.
  • Сафари: 10.1.

Source

TransformStream

Browser Support

  • Хром: 67.
  • Край: 79.
  • Firefox: 102.
  • Сафари: 14.1.

Source

Основные концепции

Прежде чем подробно рассказать о различных типах потоков, позвольте мне представить некоторые основные концепции.

Куски

Кусок — это отдельный фрагмент данных , который записывается в поток или считывается из него. Он может быть любого типа; потоки могут даже содержать куски разных типов. В большинстве случаев кусок не будет самой атомарной единицей данных для данного потока. Например, поток байтов может содержать куски, состоящие из 16 KiB Uint8Array единиц, вместо отдельных байтов.

Читаемые потоки

Читаемый поток представляет собой источник данных, из которого вы можете читать. Другими словами, данные выходят из читаемого потока. Конкретно, читаемый поток является экземпляром класса ReadableStream .

Записываемые потоки

Записываемый поток представляет собой место назначения для данных, в которые можно записывать. Другими словами, данные поступают в записываемый поток. Конкретно, записываемый поток является экземпляром класса WritableStream .

Трансформировать потоки

Поток преобразования состоит из пары потоков : записываемого потока, называемого его записываемой стороной, и читаемого потока, называемого его читаемой стороной. Реальной метафорой для этого был бы синхронный переводчик , который переводит с одного языка на другой на лету. В манере, специфичной для потока преобразования, запись в записываемую сторону приводит к тому, что новые данные становятся доступными для чтения с читаемой стороны. Конкретно, любой объект с writable свойством и readable свойством может служить потоком преобразования. Однако стандартный класс TransformStream упрощает создание такой пары, которая должным образом запутана.

Цепи труб

Потоки в основном используются путем их передачи друг другу. Читаемый поток может быть передан напрямую записываемому потоку с помощью метода pipeTo() читаемого потока, или его можно сначала передать через один или несколько потоков преобразования с помощью метода pipeThrough() читаемого потока. Набор потоков, соединенных таким образом, называется цепочкой каналов.

Противодавление

После того, как цепочка труб построена, она будет распространять сигналы относительно того, как быстро должны проходить через нее куски. Если какой-либо шаг в цепочке еще не может принимать куски, он распространяет сигнал обратно по цепочке труб, пока в конечном итоге исходный источник не получит команду прекратить производить куски так быстро. Этот процесс нормализации потока называется противодавлением.

Тиинг

Читаемый поток может быть teed (назван в честь формы заглавной буквы 'T') с помощью его метода tee() . Это заблокирует поток, то есть сделает его больше недоступен для непосредственного использования; однако это создаст два новых потока , называемых ветвями, которые могут быть потреблены независимо. Teeing также важен, потому что потоки нельзя перемотать или перезапустить, подробнее об этом позже.

Схема цепочки каналов, состоящей из читаемого потока, поступающего из вызова API fetch, который затем передается через поток преобразования, вывод которого teed, а затем отправляется в браузер для первого результирующего читаемого потока и в кэш сервисного рабочего для второго результирующего читаемого потока.
Цепь из труб.

Механика читаемого потока

Читаемый поток — это источник данных, представленный в JavaScript объектом ReadableStream , который вытекает из базового источника. Конструктор ReadableStream() создает и возвращает читаемый объект потока из заданных обработчиков. Существует два типа базового источника:

  • Источники push постоянно отправляют вам данные, когда вы к ним обращаетесь, и вам решать, начинать, приостанавливать или отменять доступ к потоку. Примерами могут служить потоки живого видео, события, отправленные сервером, или WebSockets.
  • Источники pull требуют, чтобы вы явно запрашивали данные из них после подключения. Примерами служат операции HTTP через вызовы fetch() или XMLHttpRequest .

Потоковые данные считываются последовательно небольшими порциями, называемыми кусками . Куски, помещенные в поток, называются поставленными в очередь . Это означает, что они ждут в очереди, готовые к чтению. Внутренняя очередь отслеживает куски, которые еще не были прочитаны.

Стратегия очередей — это объект, который определяет, как поток должен сигнализировать о противодавлении на основе состояния своей внутренней очереди. Стратегия очередей назначает размер каждому фрагменту и сравнивает общий размер всех фрагментов в очереди с указанным числом, известным как верхняя отметка .

Части внутри потока считываются считывателем . Этот считыватель извлекает данные по одной части за раз, позволяя вам выполнять любые операции, которые вы хотите с ними сделать. Считыватель плюс другой код обработки, который идет вместе с ним, называется потребителем .

Следующая конструкция в этом контексте называется контроллером . Каждый читаемый поток имеет связанный с ним контроллер, который, как следует из названия, позволяет управлять потоком.

Только один читатель может читать поток одновременно; когда читатель создается и начинает читать поток (то есть становится активным читателем ), он привязывается к нему. Если вы хотите, чтобы другой читатель взял на себя чтение вашего потока, вам обычно нужно освободить первого читателя, прежде чем вы сделаете что-либо еще (хотя вы можете создавать потоки).

Создание читаемого потока

Вы создаете читаемый поток, вызывая его конструктор ReadableStream() . Конструктор имеет необязательный аргумент underlyingSource , который представляет объект с методами и свойствами, которые определяют, как будет вести себя сконструированный экземпляр потока.

underlyingSource

При этом могут использоваться следующие необязательные методы, определяемые разработчиком:

  • start(controller) : Вызывается немедленно после создания объекта. Метод может получить доступ к источнику потока и сделать все необходимое для настройки функциональности потока. Если этот процесс должен выполняться асинхронно, метод может вернуть обещание, чтобы сообщить об успехе или неудаче. Параметр controller , переданный этому методу, — ReadableStreamDefaultController .
  • pull(controller) : может использоваться для управления потоком по мере извлечения большего количества фрагментов. Он вызывается многократно до тех пор, пока внутренняя очередь фрагментов потока не заполнится, пока очередь не достигнет своей верхней отметки. Если результатом вызова pull() является обещание, pull() не будет вызван снова, пока указанное обещание не будет выполнено. Если обещание будет отклонено, поток станет ошибочным.
  • cancel(reason) : вызывается, когда потребитель потока отменяет поток.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController поддерживает следующие методы:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Вторым, также необязательным, аргументом конструктора ReadableStream() является queuingStrategy . Это объект, который опционально определяет стратегию очереди для потока, которая принимает два параметра:

  • highWaterMark : неотрицательное число, указывающее максимальную отметку уровня воды в потоке, использующем данную стратегию очередизации.
  • size(chunk) : Функция, которая вычисляет и возвращает конечный неотрицательный размер заданного значения фрагмента. Результат используется для определения обратного давления, проявляющегося через соответствующее свойство ReadableStreamDefaultController.desiredSize . Он также управляет тем, когда вызывается метод pull() базового источника.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Методы getReader() и read()

Для чтения из читаемого потока вам нужен читатель, который будет ReadableStreamDefaultReader . Метод getReader() интерфейса ReadableStream создает читатель и блокирует поток для него. Пока поток заблокирован, никакой другой читатель не может быть получен, пока этот не будет освобожден.

Метод read() интерфейса ReadableStreamDefaultReader возвращает обещание, предоставляющее доступ к следующему фрагменту во внутренней очереди потока. Он выполняет или отклоняет его с результатом в зависимости от состояния потока. Различные возможности следующие:

  • Если фрагмент доступен, обещание будет выполнено с объектом формы
    { value: chunk, done: false } .
  • Если поток закроется, обещание будет выполнено с объектом формы
    { value: undefined, done: true } .
  • Если поток окажется ошибочным, обещание будет отклонено с соответствующей ошибкой.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked собственность

Проверить, заблокирован ли читаемый поток, можно, обратившись к его свойству ReadableStream.locked .

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Примеры читаемого потокового кода

В примере кода ниже показаны все шаги в действии. Сначала вы создаете ReadableStream , который в своем аргументе underlyingSource (то есть, класс TimestampSource ) определяет метод start() . Этот метод сообщает controller потока о необходимости enqueue() временную метку каждую секунду в течение десяти секунд. Наконец, он сообщает контроллеру о необходимости close() поток. Вы используете этот поток, создавая считыватель с помощью метода getReader() и вызывая read() до тех пор, пока поток не будет done .

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Асинхронная итерация

Проверка на каждой итерации цикла read() , done ли поток, может быть не самым удобным API. К счастью, скоро появится лучший способ сделать это: асинхронная итерация.

for await (const chunk of stream) {
  console.log(chunk);
}

Обходным решением для использования асинхронной итерации сегодня является реализация поведения с помощью полифилла.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Создание читаемого потока

Метод tee() интерфейса ReadableStream разветвляет текущий читаемый поток, возвращая двухэлементный массив, содержащий две результирующие ветви в качестве новых экземпляров ReadableStream . Это позволяет двум читателям одновременно читать поток. Вы можете сделать это, например, в service worker, если хотите получить ответ с сервера и передать его в браузер, а также передать его в кэш service worker. Поскольку тело ответа не может быть использовано более одного раза, для этого вам понадобятся две копии. Чтобы отменить поток, вам нужно отменить обе результирующие ветви. Разветвление потока обычно блокирует его на время, не позволяя другим читателям заблокировать его.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Читаемые потоки байтов

Для потоков, представляющих байты, предоставляется расширенная версия читаемого потока для эффективной обработки байтов, в частности, за счет минимизации копий. Байтовые потоки позволяют получать считыватели «принеси свой буфер» (BYOB). Реализация по умолчанию может давать ряд различных выходных данных, таких как строки или буферы массивов в случае WebSockets, тогда как байтовые потоки гарантируют байтовый вывод. Кроме того, считыватели BYOB имеют преимущества стабильности. Это связано с тем, что если буфер отсоединяется, он может гарантировать, что запись в один и тот же буфер не будет производиться дважды, тем самым избегая условий гонки. Считыватели BYOB могут сократить количество раз, когда браузеру необходимо запустить сборку мусора, поскольку он может повторно использовать буферы.

Создание читаемого потока байтов

Вы можете создать читаемый поток байтов, передав дополнительный параметр type в конструктор ReadableStream() .

new ReadableStream({ type: 'bytes' });

underlyingSource

Базовому источнику считываемого потока байтов предоставляется ReadableByteStreamController для управления. Его метод ReadableByteStreamController.enqueue() принимает аргумент chunk значением которого является ArrayBufferView . Свойство ReadableByteStreamController.byobRequest возвращает текущий запрос на извлечение BYOB или null, если его нет. Наконец, свойство ReadableByteStreamController.desiredSize возвращает желаемый размер для заполнения внутренней очереди контролируемого потока.

queuingStrategy

Вторым, также необязательным, аргументом конструктора ReadableStream() является queuingStrategy . Это объект, который опционально определяет стратегию очереди для потока, который принимает один параметр:

  • highWaterMark : Неотрицательное число байтов, указывающее верхнюю отметку потока, использующего эту стратегию очередизации. Используется для определения обратного давления, проявляющегося через соответствующее свойство ReadableByteStreamController.desiredSize . Также управляет тем, когда вызывается метод pull() базового источника.

Методы getReader() и read()

Затем вы можете получить доступ к ReadableStreamBYOBReader , установив параметр mode соответствующим образом: ReadableStream.getReader({ mode: "byob" }) . Это позволяет более точно контролировать распределение буфера, чтобы избежать копий. Чтобы прочитать из потока байтов, вам нужно вызвать ReadableStreamBYOBReader.read(view) , где view — это ArrayBufferView .

Пример кода читаемого потока байтов

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Следующая функция возвращает читаемые потоки байтов, которые позволяют эффективное чтение с нулевым копированием случайно сгенерированного массива. Вместо использования предопределенного размера фрагмента 1024, она пытается заполнить предоставленный разработчиком буфер, что обеспечивает полный контроль.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Механика записываемого потока

Записываемый поток — это место назначения, в которое можно записывать данные, представленное в JavaScript объектом WritableStream . Он служит абстракцией поверх базового стока — стока ввода-вывода более низкого уровня, в который записываются необработанные данные.

Данные записываются в поток через writer , по одному фрагменту за раз. Фрагмент может принимать множество форм, как и фрагменты в reader. Вы можете использовать любой код, который вам нравится, чтобы создавать фрагменты, готовые к записи; writer плюс связанный с ним код называются produce .

Когда создается писатель и начинает писать в поток ( активный писатель ), говорят, что он заблокирован для него. Только один писатель может писать в записываемый поток одновременно. Если вы хотите, чтобы другой писатель начал писать в ваш поток, вам обычно нужно освободить его, прежде чем вы затем присоедините к нему другого писателя.

Внутренняя очередь отслеживает фрагменты, которые были записаны в поток, но еще не обработаны базовым приемником.

Стратегия очередей — это объект, который определяет, как поток должен сигнализировать о противодавлении на основе состояния своей внутренней очереди. Стратегия очередей назначает размер каждому фрагменту и сравнивает общий размер всех фрагментов в очереди с указанным числом, известным как верхняя отметка .

Последняя конструкция называется контроллером . Каждый записываемый поток имеет связанный с ним контроллер, который позволяет управлять потоком (например, прерывать его).

Создание записываемого потока

Интерфейс WritableStream API потоков предоставляет стандартную абстракцию для записи потоковых данных в пункт назначения, известный как приемник. Этот объект поставляется со встроенным обратным давлением и очередями. Вы создаете записываемый поток, вызывая его конструктор WritableStream() . Он имеет необязательный параметр underlyingSink , который представляет объект с методами и свойствами, определяющими, как будет вести себя сконструированный экземпляр потока.

underlyingSink

underlyingSink может включать следующие необязательные методы, определяемые разработчиком. Параметр controller , передаваемый некоторым методам, — это WritableStreamDefaultController .

  • start(controller) : Этот метод вызывается немедленно при создании объекта. Содержимое этого метода должно быть направлено на получение доступа к базовому приемнику. Если этот процесс должен выполняться асинхронно, он может вернуть обещание, чтобы сигнализировать об успехе или неудаче.
  • write(chunk, controller) : этот метод будет вызван, когда новый фрагмент данных (указанный в параметре chunk ) будет готов к записи в базовый приемник. Он может вернуть обещание, чтобы сообщить об успешности или неудаче операции записи. Этот метод будет вызван только после того, как предыдущие записи были успешными, и никогда после того, как поток будет закрыт или прерван.
  • close(controller) : этот метод будет вызван, если приложение подаст сигнал о том, что оно завершило запись фрагментов в поток. Содержимое должно сделать все необходимое для завершения записи в базовый приемник и освободить доступ к нему. Если этот процесс асинхронный, он может вернуть обещание, чтобы сообщить об успехе или неудаче. Этот метод будет вызван только после того, как все записи в очереди будут успешными.
  • abort(reason) : этот метод будет вызван, если приложение сигнализирует, что оно хочет внезапно закрыть поток и перевести его в ошибочное состояние. Он может очистить любые удерживаемые ресурсы, как close() , но abort() будет вызван, даже если записи поставлены в очередь. Эти фрагменты будут отброшены. Если этот процесс асинхронный, он может вернуть обещание, чтобы сообщить об успехе или неудаче. Параметр reason содержит DOMString , описывающий причину прерывания потока.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Интерфейс WritableStreamDefaultController API потоков представляет собой контроллер, позволяющий управлять состоянием WritableStream во время настройки, по мере отправки большего количества фрагментов для записи или в конце записи. При создании WritableStream базовому приемнику предоставляется соответствующий экземпляр WritableStreamDefaultController для управления. WritableStreamDefaultController имеет только один метод: WritableStreamDefaultController.error() , который приводит к ошибке любого будущего взаимодействия со связанным потоком. WritableStreamDefaultController также поддерживает свойство signal , которое возвращает экземпляр AbortSignal , позволяя при необходимости останавливать операцию WritableStream .

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Вторым, также необязательным, аргументом конструктора WritableStream() является queuingStrategy . Это объект, который опционально определяет стратегию очереди для потока, которая принимает два параметра:

  • highWaterMark : неотрицательное число, указывающее максимальную отметку уровня воды в потоке, использующем данную стратегию очередизации.
  • size(chunk) : Функция, которая вычисляет и возвращает конечный неотрицательный размер заданного значения chunk. Результат используется для определения обратного давления, проявляющегося через соответствующее свойство WritableStreamDefaultWriter.desiredSize .

Методы getWriter() и write()

Для записи в записываемый поток вам нужен писатель, который будет WritableStreamDefaultWriter . Метод getWriter() интерфейса WritableStream возвращает новый экземпляр WritableStreamDefaultWriter и блокирует поток для этого экземпляра. Пока поток заблокирован, никакой другой писатель не может быть получен, пока текущий не будет освобожден.

Метод write() интерфейса WritableStreamDefaultWriter записывает переданный фрагмент данных в WritableStream и его базовый приемник, затем возвращает обещание, которое разрешается, чтобы указать на успех или неудачу операции записи. Обратите внимание, что то, что означает «успех», зависит от базового приемника; это может означать, что фрагмент был принят, и не обязательно, что он безопасно сохранен в конечном месте назначения.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked собственность

Проверить, заблокирован ли доступный для записи поток, можно, обратившись к его свойству WritableStream.locked .

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Пример кода записываемого потока

В примере кода ниже показаны все шаги в действии.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Передача читаемого потока в записываемый поток

Читаемый поток может быть передан в записываемый поток с помощью метода pipeTo() читаемого потока. ReadableStream.pipeTo() передает текущий ReadableStream в заданный WritableStream и возвращает обещание, которое выполняется, если процесс передачи завершается успешно, или отклоняется, если обнаружены какие-либо ошибки.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Создание потока преобразования

Интерфейс TransformStream API Streams представляет собой набор преобразуемых данных. Вы создаете поток преобразования, вызывая его конструктор TransformStream() , который создает и возвращает объект потока преобразования из указанных обработчиков. Конструктор TransformStream() принимает в качестве своего первого аргумента необязательный объект JavaScript, представляющий transformer . Такие объекты могут содержать любой из следующих методов:

transformer

  • start(controller) : этот метод вызывается немедленно после создания объекта. Обычно он используется для постановки в очередь префиксных фрагментов с помощью controller.enqueue() . Эти фрагменты будут считываться с читаемой стороны, но не зависят от каких-либо записей на записываемую сторону. Если этот начальный процесс асинхронный, например, потому что требуются некоторые усилия для получения префиксных фрагментов, функция может вернуть обещание, чтобы сообщить об успехе или неудаче; отклоненное обещание приведет к ошибке потока. Любые выданные исключения будут повторно выданы конструктором TransformStream() .
  • transform(chunk, controller) : этот метод вызывается, когда новый фрагмент, изначально записанный на записываемую сторону, готов к преобразованию. Реализация потока гарантирует, что эта функция будет вызвана только после успешного выполнения предыдущих преобразований, и никогда до завершения start() или после вызова flush() . Эта функция выполняет фактическую работу по преобразованию потока преобразования. Она может ставить результаты в очередь с помощью controller.enqueue() . Это позволяет одному фрагменту, записанному на записываемую сторону, приводить к нулю или нескольким фрагментам на читаемой стороне в зависимости от того, сколько раз вызывается controller.enqueue() . Если процесс преобразования асинхронный, эта функция может возвращать обещание, чтобы сообщить об успешности или неудаче преобразования. Отклоненное обещание приведет к ошибке как читаемой, так и записываемой сторон потока преобразования. Если метод transform() не указан, используется тождественное преобразование, которое ставит в очередь неизмененные фрагменты с записываемой стороны на читаемую сторону.
  • flush(controller) : этот метод вызывается после того, как все фрагменты, записанные на записываемую сторону, были преобразованы путем успешного прохождения через transform() , и записываемая сторона вот-вот будет закрыта. Обычно это используется для постановки в очередь фрагментов суффикса на читаемую сторону, прежде чем она тоже будет закрыта. Если процесс очистки асинхронный, функция может вернуть обещание, чтобы сообщить об успехе или неудаче; результат будет передан вызывающему stream.writable.write() . Кроме того, отклоненное обещание приведет к ошибке как на читаемой, так и на записываемой стороне потока. Выдача исключения обрабатывается так же, как и возврат отклоненного обещания.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Стратегии очередей writableStrategy и readableStrategy

Второй и третий необязательные параметры конструктора TransformStream() — это необязательные стратегии очередей writableStrategy и readableStrategy . Они определены, как описано в разделах readable и writable stream соответственно.

Пример кода потока преобразования

В следующем примере кода показан простой поток преобразования в действии.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Передача читаемого потока через поток преобразования

Метод pipeThrough() интерфейса ReadableStream обеспечивает цепочечный способ конвейеризации текущего потока через поток преобразования или любую другую пару для записи/чтения. Конвейеризация потока обычно блокирует его на время работы конвейера, не давая другим читателям заблокировать его.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Следующий пример кода (немного надуманный) показывает, как можно реализовать "кричащую" версию fetch() , которая преобразует весь текст в верхний регистр, используя возвращаемое обещание ответа как поток и преобразуя в верхний регистр фрагмент за фрагментом. Преимущество этого подхода в том, что вам не нужно ждать загрузки всего документа, что может иметь огромное значение при работе с большими файлами.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Демо

Демонстрация ниже демонстрирует читаемые, записываемые и преобразуемые потоки в действии. Она также включает примеры цепочек каналов pipeThrough() и pipeTo() , а также демонстрирует tee() . Вы можете по желанию запустить демонстрацию в ее собственном окне или просмотреть исходный код .

Полезные трансляции доступны в браузере

Есть ряд полезных потоков, встроенных прямо в браузер. Вы можете легко создать ReadableStream из blob. Метод stream() интерфейса Blob возвращает ReadableStream , который при чтении возвращает данные, содержащиеся в blob. Также помните, что объект File — это особый вид Blob и может использоваться в любом контексте, в котором может использоваться blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Потоковые варианты TextDecoder.decode() и TextEncoder.encode() называются TextDecoderStream и TextEncoderStream соответственно.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Сжатие или распаковка файла легко выполняется с помощью потоков преобразования CompressionStream и DecompressionStream соответственно. Пример кода ниже показывает, как можно загрузить спецификацию Streams, сжать (gzip) ее прямо в браузере и записать сжатый файл непосредственно на диск.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream API доступа к файловой системе и экспериментальные потоки запросов fetch() являются примерами доступных для записи потоков в реальной жизни.

Последовательный API активно использует как читаемые, так и записываемые потоки.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Наконец, API WebSocketStream интегрирует потоки с API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Полезные ресурсы

Благодарности

Эту статью рецензировали Джейк Арчибальд , Франсуа Бофорт , Сэм Даттон , Маттиас Буэленс , Сурма , Джо Медли и Адам Райс . Записи в блоге Джейка Арчибальда очень помогли мне в понимании потоков. Некоторые примеры кода вдохновлены исследованиями пользователя GitHub @bellbind , а части текста в значительной степени основаны на документах MDN Web Docs по потокам . Авторы Streams Standard проделали колоссальную работу по написанию этой спецификации. Изображение главного героя Райана Лары на Unsplash .